package com.wudl.flink.examples;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @ClassName : FlinkSqlMysqlToMySql
 * @Description : Flink sql-mysql
 * @Author :wudl
 * @Date: 2021-08-24 23:28
 */

public class FlinkSqlMysqlToMySql02 {
    public static void main(String[] args) {

        String driverClass = "com.mysql.jdbc.Driver";

        String dbUrl = "jdbc:mysql://192.168.1.180:3306/MyFlink";
        String userNmae = "root";
        String passWord = "123456";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEvn = StreamTableEnvironment .create(env,settings);
        //1. 指定方言
        tableEvn.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        String schema = "id INT ,name STRING";
        String source_table = "student";
        String flink_sink_table = "SinkStudent";

        TableResult inputTable = tableEvn.executeSql("CREATE TABLE  sourceTable (" +
                "id int ," +
                "name varchar" +
                ") " +
                "WITH (" +
                "'connector' = 'jdbc'," +
                "'url' = '" + dbUrl + "'," +
                "'table-name' = '"+source_table +"'," +
                " 'username' = '" + userNmae + "'," +
                " 'password' = '" + passWord + "'" +
                " )");

      TableResult outPutTable = tableEvn.executeSql("CREATE TABLE  sinkTable (" +
              "id int ," +
              "name varchar" +
              ") " +
                "WITH (" +
                "'connector' = 'jdbc'," +
                "'url' = '" + dbUrl + "'," +
                "'table-name' = '"+flink_sink_table+"'," +
                " 'username' = '" + userNmae + "'," +
                " 'password' = '" + passWord + "'" +
                " )");

        String sql = " select id,name from sourceTable";

        Table ResultTable = tableEvn.sqlQuery(sql);
        // 使用追加流  多次同步防止 数据重复
        DataStream<Tuple2<Boolean, Row>> resultDS = tableEvn.toRetractStream(ResultTable, Row.class);
        resultDS.print();
        tableEvn.executeSql("insert into sinkTable select * from "+ResultTable);

//        tableEvn.executeSql("insert into outPutTable select * from sourceTable" );

        System.out.println("数据写入mysql成功");


    }
}
